-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[BREAKING] Python: Checkpoint refactor: encode/decode, checkpoint format, etc #3744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[BREAKING] Python: Checkpoint refactor: encode/decode, checkpoint format, etc #3744
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Refactors Python workflow checkpointing to store live objects in WorkflowCheckpoint and defer serialization to storage backends (with a new pickle+base64 encoding strategy), while updating the runner/workflow APIs, samples, DevUI, and tests to the new checkpoint format and storage interfaces.
Changes:
- Redesign checkpoint payloads: replace
workflow_idwithworkflow_name+graph_signature_hash, addprevious_checkpoint_id, and store message/event objects directly. - Update checkpoint storage APIs (
save/load/delete/get_latest/list_*) and switch file persistence to JSON wrappers containing pickled payloads. - Update workflow/runner checkpoint handling and adjust samples/tests/telemetry to the new semantics.
Reviewed changes
Copilot reviewed 43 out of 44 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| python/samples/getting_started/workflows/checkpoint/workflow_as_agent_checkpoint.py | Update checkpoint listing to use workflow.name. |
| python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py | Update checkpoint listing to use workflow.name. |
| python/samples/getting_started/workflows/checkpoint/handoff_with_tool_approval_checkpoint_resume.py | Remove old sample (moved/replaced). |
| python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py | Update checkpoint listing to use workflow.name. |
| python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py | Remove checkpoint summary usage; update listing to workflow.name. |
| python/samples/getting_started/orchestrations/magentic_checkpoint.py | Update checkpoint listing to use workflow.name / workflow_name. |
| python/samples/getting_started/orchestrations/handoff_with_tool_approval_checkpoint_resume.py | New orchestrations sample demonstrating resume with approvals using new APIs. |
| python/packages/orchestrations/tests/test_sequential.py | Update checkpoint API usage and selection logic for resume tests. |
| python/packages/orchestrations/tests/test_magentic.py | Update checkpoint API usage and selection logic; adjust load/delete API calls. |
| python/packages/orchestrations/tests/test_handoff.py | Update checkpoint listing to use workflow.name. |
| python/packages/orchestrations/tests/test_group_chat.py | Update checkpoint listing to use workflow.name. |
| python/packages/orchestrations/tests/test_concurrent.py | Update checkpoint API usage and selection logic for resume tests. |
| python/packages/orchestrations/agent_framework_orchestrations/_orchestration_state.py | Change orchestration checkpoint state to store live objects directly. |
| python/packages/orchestrations/agent_framework_orchestrations/_group_chat.py | Store live cache objects in executor checkpoint state. |
| python/packages/devui/agent_framework_devui/_server.py | Update DevUI delete API call to storage.delete(). |
| python/packages/devui/agent_framework_devui/_executor.py | Update DevUI checkpoint listing to filter by workflow.name. |
| python/packages/core/tests/workflow/test_workflow_observability.py | Update OTEL attribute expectations and checkpoint message assertions to object-based payloads. |
| python/packages/core/tests/workflow/test_workflow_agent.py | Update checkpoint listing to use workflow.name. |
| python/packages/core/tests/workflow/test_workflow.py | Update checkpoint model fields + storage API names; update exception assertions. |
| python/packages/core/tests/workflow/test_sub_workflow.py | Update checkpoint listing to use workflow.name. |
| python/packages/core/tests/workflow/test_serialization.py | Update expectation: workflow.name is always populated. |
| python/packages/core/tests/workflow/test_runner.py | Update Runner ctor signature; add extensive checkpoint/restore tests. |
| python/packages/core/tests/workflow/test_request_info_event_rehydrate.py | Rewrite tests around pickled checkpoint encoding and request_info restore behavior. |
| python/packages/core/tests/workflow/test_request_info_and_response.py | Remove duplicated checkpoint test (moved to rehydrate suite). |
| python/packages/core/tests/workflow/test_checkpoint_validation.py | Update checkpoint listing to use workflow.name. |
| python/packages/core/tests/workflow/test_checkpoint_encode.py | Update encoding tests for pickle marker/type marker approach. |
| python/packages/core/tests/workflow/test_checkpoint_decode.py | Update decode tests for pickle/type-marker verification. |
| python/packages/core/tests/workflow/test_checkpoint.py | Major expansion of storage roundtrip tests; new API names and checkpoint fields. |
| python/packages/core/tests/workflow/test_agent_executor.py | Update checkpoint listing and selection logic for restore test. |
| python/packages/core/agent_framework/observability.py | Add workflow builder OTEL attributes. |
| python/packages/core/agent_framework/_workflows/_workflow_executor.py | Store execution contexts directly; rely on workflow-level filtering for handled request_info events. |
| python/packages/core/agent_framework/_workflows/_workflow_builder.py | Always assign a builder name (UUID if omitted); update build telemetry attributes. |
| python/packages/core/agent_framework/_workflows/_workflow.py | Make name required; compute graph_signature_hash; filter request_info events when responses provided. |
| python/packages/core/agent_framework/_workflows/_runner_context.py | Change checkpoint creation payloads to store live objects; update checkpoint method signatures. |
| python/packages/core/agent_framework/_workflows/_runner.py | Pass workflow_name/graph hash into checkpoints; add previous-checkpoint chaining; remove legacy state hooks. |
| python/packages/core/agent_framework/_workflows/_events.py | Stop encoding/decoding request_info data in to_dict/from_dict (store live objects). |
| python/packages/core/agent_framework/_workflows/_conversation_state.py | Remove legacy chat message encode/decode helpers. |
| python/packages/core/agent_framework/_workflows/_checkpoint_summary.py | Remove checkpoint summary helper. |
| python/packages/core/agent_framework/_workflows/_checkpoint_encoding.py | Replace custom JSON encoding with pickle+base64 marker strategy + type verification. |
| python/packages/core/agent_framework/_workflows/_checkpoint.py | Redesign checkpoint schema + storage protocol; implement in-memory and file storage with new encoding. |
| python/packages/core/agent_framework/_workflows/_agent_executor.py | Store live conversation/cache + pending request structures in checkpoint state. |
| python/packages/core/agent_framework/_workflows/init.py | Remove checkpoint summary exports; keep updated checkpoint exports. |
| python/.cspell.json | Add checkpoint-related words. |
Comments suppressed due to low confidence (1)
python/packages/core/agent_framework/_workflows/_runner_context.py:230
RunnerContext.load_checkpoint()is declared (and documented) as returningWorkflowCheckpoint | None, butInProcRunnerContext.load_checkpoint()now returns a non-optional checkpoint and relies on storage raising when missing. This is an API/typing mismatch that will confuse callers and forces redundantNonechecks. Align the protocol + docs with the new behavior (raiseWorkflowCheckpointException/ return non-optional), and adjust call sites accordingly.
async def load_checkpoint(self, checkpoint_id: CheckpointID) -> WorkflowCheckpoint | None:
"""Load a checkpoint without mutating the current context state.
Args:
checkpoint_id: The ID of the checkpoint to load.
Returns:
The loaded checkpoint, or None if it does not exist.
"""
Motivation and Context
Closes: #3530, #3529, #1665
Description
WorkflowCheckpointnow contains live objects, as opposed to serialized jsons. This makes working with a checkpoint much easier in code.FileCheckpointStoragenow uses pickle.InMemoryCheckpointStoragenow storages raw checkpoints (i.e. no serialization)workflow_idfrom checkpointsprevious_checkpoint_idto checkpointsContribution Checklist